import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

public class WindowCountOrTimeTrigger 
{

    public static void main(String[] args) throws Exception 
    {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Properties propsConsumer = new Properties();
        propsConsumer.setProperty("bootstrap.servers","Desktop:9091,Laptop:9092,Laptop:9093");
        propsConsumer.setProperty("group.id", "trafficwisdom-streaming");
        propsConsumer.put("enable.auto.commit", false);
        propsConsumer.put("max.poll.records", 1000);
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>("test", new SimpleStringSchema(), propsConsumer);
        consumer.setStartFromLatest();
        DataStream<String> stream = env.addSource(consumer);//增加数据源
        stream.print();



        DataStream<Tuple2<String, Integer>> exposure = stream.map(new MapFunction1()).filter(tuple2 -> tuple2.f0 != null);

        DataStream<Tuple2<String, Integer>> result = exposure.keyBy(0).timeWindow(Time.minutes(5)).trigger(TimeCountTrigger.of(1, Time.minutes(1))).sum(1);
        result.print();
        env.execute();
    }
}


/*
代码来自:
https://blog.csdn.net/baifanwudi/article/details/102937381


代码目的是自定义窗口触发:
①keyby下过来一条就触发窗口统计，
②如果没消息过来，按60s触发一次窗口。

*/
